package com.lenxia.spark.base

import java.util

import kafka.serializer.StringDecoder
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}

/**
  * Author  : Lenxia
  * Created : 2017/8/18
  * Updated : 2017/8/18
  * Version : 0.0.0
  * Contact : 2219708253@qq.com
  * spark-streaming操作kafka的两种方式比较 ,recevier和direct
  * recevier方式：
  * 1、rdd并行度：该种方式与kafka中的partition不是一对一，提高并行度，只能通过KafkaUtils.createStream()多线程创建
  * 2、rdd计算过程中数据容易丢失，为了避免丢失需要WAL(预写日志的形式保存中间计算结果)
  * 3、kafka的消费偏移量保存在zookeeper中，对客户端不可见。
  * 4、该种方式需要额外占用一个线程供recevier接受数据，并提交job。如果该线程挂掉，作业是无法执行的。
  * *
  * direct方式:
  * 1、rdd并行度，与kafka中partition一对一，所以提高rdd并行度只需要增加topic的partition数量即可。
  * 2、计算过程中数据不会丢失(可以进行重新计算)，取决于kafka中topic的生命周期(retention).
  * 3、偏移量在checkpoint中保存，客户端完全可以根据自己操作offset。
  */
object KafkaWordCount {
  
  def recevier(args: Array[String]): Unit = {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }
    
    // 接受外部参数
    val Array(zkQuorum, group, topics, numThreads) = args
    
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")
    
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    
    // 返回(offset,line)元祖
    val pairs: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
    
    val lines = pairs.map(pair => pair._2)
    // val lines = pairs.map(_._2)
    // streaming算子
    val words = lines.flatMap(_.split(" "))
    
    val wordCounts = words.map(x => (x, 1L))
      .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
    
    wordCounts.print()
    
    ssc.start()
    ssc.awaitTermination()
    
  }
  
  def direct(args: Array[String]) = {
    if (args.length < 2) {
      System.err.println(
        s"""
           |Usage: DirectKafkaWordCount <brokers> <topics>
           |  <brokers> is a list of one or more Kafka brokers
           |  <topics> is a list of one or more kafka topics to consume from
        """.stripMargin)
      System.exit(1)
    }
    
    val Array(brokers, topics) = args
    
    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    
    // 定义一个ofsset数组
    var offsetRanges = Array[OffsetRange]()
    
    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)
    
    
    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_._2)
    val words = lines.flatMap(_.split(" "))
    
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    
    // 获取kafka偏移量
    wordCounts.foreachRDD(rdd => {
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      
      for (offset <- offsetRanges) {
        // 可以根据需求将偏移量保存到zookeeper中或者其他存储系统
        println(s"${offset.topic} ${offset.partition} ${offset.fromOffset} ${offset.untilOffset}")
      }
    })
    
    wordCounts.print()
    
    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
  
  
  def producer(args: Array[String]) = {
    if (args.length < 4) {
      System.err.println(
        """
          |Usage: KafkaWordCountProducer <metadataBrokerList> <topic> <messagesPerSec> <wordsPerMessage>
          | <metadataBrokerList> is kafka brokers list,eg:broker1:9092,broker2:9092
          | <topic> is topic for name
          | <messagesPerSec> is message-nums in 1s.
          | <wordsPerMessage> is word-nums in per message.
        """.stripMargin
        )
      System.exit(1)
    }
    
    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
    
    // Zookeeper connection properties
    val props = new util.HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    
    val producer = new KafkaProducer[String, String](props)
    
    // Send some messages
    while (true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
          .mkString(" ")
        
        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }
      
      Thread.sleep(1000)
    }
  }
  
  def main(args: Array[String]): Unit = {
    //    recevier(args)
    //    direct(args)
  }
}
